package org.msgcenter.mc.actor;


import cn.hutool.core.util.ObjectUtil;
import cn.hutool.http.HttpStatus;
import cn.hutool.json.JSONUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisAPI;
import lombok.extern.slf4j.Slf4j;
import org.msgcenter.mc.common.JsonResult;
import org.msgcenter.mc.ds.SiteMsg;

import java.util.ArrayList;

@Slf4j
public class MsgRedisActor extends AbstractVerticle {

  private RedisAPI api;

  private final String SiteMsgIdKey = "SITE_MSG:MSG_ID";

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    super.start(startPromise);
    Redis.createClient(vertx, "redis://127.0.0.1:6379").connect()
      .onSuccess(conn -> {
        api = RedisAPI.api(conn);
        log.info("连接redis成功");
      })
      .onFailure(err -> log.error("连接redis发生错误: {}", err.getMessage()));

    EventBus eb = vertx.eventBus();

    onSiteMsgIdCheck(eb);
  }

  // 站点消息id去重
  private void onSiteMsgIdCheck(EventBus eb) {
    MessageConsumer<String> consumer = eb.consumer("site_msg.msg_id.check");

    consumer.handler(msg -> {
      SiteMsg siteMsg = JSONUtil.toBean(msg.body(), SiteMsg.class);

      api
        .sismember(SiteMsgIdKey, siteMsg.getMsgId().toString())
        .compose(res -> {
          Long existed = res.toLong();
          if (existed == 1) {
            log.error("重复站点消息id {}", siteMsg.getMsgId());
            return Future.succeededFuture(new JsonResult<>(0, "站点消息id重复", Boolean.FALSE));
          }

          ArrayList<String> list = new ArrayList<>();
          list.add(SiteMsgIdKey);
          list.add(siteMsg.getMsgId().toString());
          api.sadd(list);
          return Future.succeededFuture(new JsonResult<>(HttpStatus.HTTP_OK, "站点消息id可用", Boolean.TRUE));
        })
        .onSuccess(res -> msg.reply(JSONUtil.toJsonStr(res)));
    });
  }

  @Override
  public void stop(Promise<Void> stopPromise) throws Exception {
    super.stop(stopPromise);

    if (ObjectUtil.isNotNull(api)) {
      api.close();
    }
  }
}
